In [ ]:
# %%bash
# pip install tensorflow==1.7
# pip install tensorflow-transform
This bigquery-public-data:hacker_news contains all stories and comments from Hacker News from its launch in 2006. Each story contains a story id, url, the title of the story, tthe author that made the post, when it was written, and the number of points the story received.
The objective is, given the title of the story, we want to build an ML model that can predict the source of this story.
This notebook illustrates how to build a TF premade estimator, namely DNNClassifier, while the input text will be repesented as TF.IDF computed during the preprocessing phase in Part 1. The overall steps are as follows:
In [1]:
import os
class Params:
pass
# Set to run on GCP
Params.GCP_PROJECT_ID = 'ksalama-gcp-playground'
Params.REGION = 'europe-west1'
Params.BUCKET = 'ksalama-gcs-cloudml'
Params.PLATFORM = 'local' # local | GCP
Params.DATA_DIR = 'data/news' if Params.PLATFORM == 'local' else 'gs://{}/data/news'.format(Params.BUCKET)
Params.TRANSFORMED_DATA_DIR = os.path.join(Params.DATA_DIR, 'transformed')
Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX = os.path.join(Params.TRANSFORMED_DATA_DIR, 'train')
Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX = os.path.join(Params.TRANSFORMED_DATA_DIR, 'eval')
Params.TEMP_DIR = os.path.join(Params.DATA_DIR, 'tmp')
Params.MODELS_DIR = 'models/news' if Params.PLATFORM == 'local' else 'gs://{}/models/news'.format(Params.BUCKET)
Params.TRANSFORM_ARTEFACTS_DIR = os.path.join(Params.MODELS_DIR,'transform')
Params.TRAIN = True
Params.RESUME_TRAINING = False
Params.EAGER = False
if Params.EAGER:
tf.enable_eager_execution()
In [2]:
import tensorflow as tf
from tensorflow import data
from tensorflow.contrib.learn.python.learn.utils import input_fn_utils
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from tensorflow_transform.tf_metadata import metadata_io
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.saved import saved_transform_io
print tf.__version__
In [3]:
RAW_HEADER = 'key,title,source'.split(',')
RAW_DEFAULTS = [['NA'],['NA'],['NA']]
TARGET_FEATURE_NAME = 'source'
TARGET_LABELS = ['github', 'nytimes', 'techcrunch']
TEXT_FEATURE_NAME = 'title'
KEY_COLUMN = 'key'
VOCAB_SIZE = 20000
TRAIN_SIZE = 73124
EVAL_SIZE = 23079
DELIMITERS = '.,!?() '
raw_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema({
KEY_COLUMN: dataset_schema.ColumnSchema(
tf.string, [], dataset_schema.FixedColumnRepresentation()),
TEXT_FEATURE_NAME: dataset_schema.ColumnSchema(
tf.string, [], dataset_schema.FixedColumnRepresentation()),
TARGET_FEATURE_NAME: dataset_schema.ColumnSchema(
tf.string, [], dataset_schema.FixedColumnRepresentation()),
}))
transformed_metadata = metadata_io.read_metadata(
os.path.join(Params.TRANSFORM_ARTEFACTS_DIR,"transformed_metadata"))
raw_feature_spec = raw_metadata.schema.as_feature_spec()
transformed_feature_spec = transformed_metadata.schema.as_feature_spec()
print transformed_feature_spec
In [4]:
def parse_tf_example(tf_example):
parsed_features = tf.parse_single_example(serialized=tf_example, features=transformed_feature_spec)
target = parsed_features.pop(TARGET_FEATURE_NAME)
return parsed_features, target
def generate_tfrecords_input_fn(files_pattern,
mode=tf.estimator.ModeKeys.EVAL,
num_epochs=1,
batch_size=200):
def _input_fn():
file_names = data.Dataset.list_files(files_pattern)
if Params.EAGER:
print file_names
dataset = data.TFRecordDataset(file_names )
dataset = dataset.apply(
tf.contrib.data.shuffle_and_repeat(count=num_epochs,
buffer_size=batch_size*2)
)
dataset = dataset.apply(
tf.contrib.data.map_and_batch(parse_tf_example,
batch_size=batch_size,
num_parallel_batches=2)
)
datset = dataset.prefetch(batch_size)
if Params.EAGER:
return dataset
iterator = dataset.make_one_shot_iterator()
features, target = iterator.get_next()
return features, target
return _input_fn
In [5]:
BOW_FEATURE_NAME = 'bow'
TFIDF_FEATURE_NAME = 'weight'
def create_feature_columns():
# Get word indecies from bow
bow = tf.feature_column.categorical_column_with_identity(
BOW_FEATURE_NAME, num_buckets=VOCAB_SIZE + 1)
# Add weight to the word indecies
weight_bow = tf.feature_column.weighted_categorical_column(
bow, TFIDF_FEATURE_NAME)
# Convert to indicator
weight_bow_indicators = tf.feature_column.indicator_column(weight_bow)
return [weight_bow_indicators]
In [6]:
def create_estimator(hparams, run_config):
feature_columns = create_feature_columns()
optimizer = tf.train.AdamOptimizer(learning_rate=hparams.learning_rate)
estimator = tf.estimator.DNNClassifier(
feature_columns=feature_columns,
n_classes =len(TARGET_LABELS),
label_vocabulary=TARGET_LABELS,
hidden_units=hparams.hidden_units,
optimizer=optimizer,
config=run_config
)
return estimator
In [7]:
NUM_EPOCHS = 10
BATCH_SIZE = 1000
TOTAL_STEPS = (TRAIN_SIZE/BATCH_SIZE)*NUM_EPOCHS
EVAL_EVERY_SEC = 60
hparams = tf.contrib.training.HParams(
num_epochs = NUM_EPOCHS,
batch_size = BATCH_SIZE,
learning_rate = 0.01,
hidden_units=[64, 32],
max_steps = TOTAL_STEPS,
)
MODEL_NAME = 'dnn_estimator_tfidf'
model_dir = os.path.join(Params.MODELS_DIR, MODEL_NAME)
run_config = tf.estimator.RunConfig(
tf_random_seed=19830610,
log_step_count_steps=1000,
save_checkpoints_secs=EVAL_EVERY_SEC,
keep_checkpoint_max=1,
model_dir=model_dir
)
print(hparams)
print("")
print("Model Directory:", run_config.model_dir)
print("Dataset Size:", TRAIN_SIZE)
print("Batch Size:", BATCH_SIZE)
print("Steps per Epoch:",TRAIN_SIZE/BATCH_SIZE)
print("Total Steps:", TOTAL_STEPS)
In [8]:
def generate_serving_input_fn():
def _serving_fn():
receiver_tensor = {
'title': tf.placeholder(dtype=tf.string, shape=[None])
}
_, transformed_features = (
saved_transform_io.partially_apply_saved_transform(
os.path.join(Params.TRANSFORM_ARTEFACTS_DIR, transform_fn_io.TRANSFORM_FN_DIR),
receiver_tensor)
)
return tf.estimator.export.ServingInputReceiver(
transformed_features, receiver_tensor)
return _serving_fn
In [9]:
train_spec = tf.estimator.TrainSpec(
input_fn = generate_tfrecords_input_fn(
Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX+"*",
mode = tf.estimator.ModeKeys.TRAIN,
num_epochs=hparams.num_epochs,
batch_size=hparams.batch_size
),
max_steps=hparams.max_steps,
hooks=None
)
eval_spec = tf.estimator.EvalSpec(
input_fn = generate_tfrecords_input_fn(
Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX+"*",
mode=tf.estimator.ModeKeys.EVAL,
num_epochs=1,
batch_size=hparams.batch_size
),
exporters=[tf.estimator.LatestExporter(
name="estimate", # the name of the folder in which the model will be exported to under export
serving_input_receiver_fn=generate_serving_input_fn(),
exports_to_keep=1,
as_text=False)],
steps=None,
throttle_secs=EVAL_EVERY_SEC
)
In [10]:
from datetime import datetime
import shutil
if Params.TRAIN:
if not Params.RESUME_TRAINING:
print("Removing previous training artefacts...")
shutil.rmtree(model_dir, ignore_errors=True)
else:
print("Resuming training...")
tf.logging.set_verbosity(tf.logging.INFO)
time_start = datetime.utcnow()
print("Experiment started at {}".format(time_start.strftime("%H:%M:%S")))
print(".......................................")
estimator = create_estimator(hparams, run_config)
tf.estimator.train_and_evaluate(
estimator=estimator,
train_spec=train_spec,
eval_spec=eval_spec
)
time_end = datetime.utcnow()
print(".......................................")
print("Experiment finished at {}".format(time_end.strftime("%H:%M:%S")))
print("")
time_elapsed = time_end - time_start
print("Experiment elapsed time: {} seconds".format(time_elapsed.total_seconds()))
else:
print "Training was skipped!"
In [11]:
tf.logging.set_verbosity(tf.logging.ERROR)
estimator = create_estimator(hparams, run_config)
train_metrics = estimator.evaluate(
input_fn = generate_tfrecords_input_fn(
files_pattern= Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX+"*",
mode= tf.estimator.ModeKeys.EVAL,
batch_size= TRAIN_SIZE),
steps=1
)
print("############################################################################################")
print("# Train Measures: {}".format(train_metrics))
print("############################################################################################")
eval_metrics = estimator.evaluate(
input_fn=generate_tfrecords_input_fn(
files_pattern= Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX+"*",
mode= tf.estimator.ModeKeys.EVAL,
batch_size= EVAL_SIZE),
steps=1
)
print("")
print("############################################################################################")
print("# Eval Measures: {}".format(eval_metrics))
print("############################################################################################")
In [12]:
import os
export_dir = model_dir +"/export/estimate/"
saved_model_dir = os.path.join(export_dir, os.listdir(export_dir)[0])
print(saved_model_dir)
print("")
predictor_fn = tf.contrib.predictor.from_saved_model(
export_dir = saved_model_dir,
signature_def_key="predict"
)
output = predictor_fn(
{
'title':[
'Microsoft and Google are joining forces for a new AI framework',
'A new version of Python is mind blowing',
'EU is investigating new data privacy policies'
]
}
)
print(output)
In [ ]: